iT邦幫忙

2025 iThome 鐵人賽

DAY 27
0
AI & Data

來都來了,那就做一個GCP從0到100的AI助理系列 第 27

Firebase + Pub/Sub 即時推播:GCP 即時對話

  • 分享至 

  • xImage
  •  

昨天我們建立了 chat-service 和 RAG 檢索系統,但用戶體驗還有個大問題:等待黑盒子。用戶發問後只能乾等 10-30 秒,不知道系統在做什麼。

這篇要用 GCP 原生服務 打造真正的即時推播,讓用戶看到每個處理步驟的進度。

目標:零維護的即時推播系統,支用戶併發,離線也能收到通知

1) 系統架構:100% GCP 託管服務

flowchart TB
    subgraph "前端層"
        WEB[Web App<br/>React/Vue]
        MOBILE[Mobile App<br/>Flutter/React Native]
        FIREBASE_SDK[Firebase SDK<br/>即時監聽]
    end

    subgraph "GCP 託管服務層"
        FIRESTORE[Firestore<br/>即時資料庫]
        FUNCTIONS[Cloud Functions<br/>事件處理]
        PUBSUB[Pub/Sub<br/>事件匯流排]
        EVENTARC[Eventarc<br/>事件路由]
        FCM[Firebase Cloud Messaging<br/>推播通知]
    end

    subgraph "應用服務層"
        CHAT[chat-service]
        RAG[rag-service<br/>Agent Builder]
        WORKER[worker-service<br/>Cloud Run Jobs]
    end

    subgraph "即時資料流"
        WEB --> FIREBASE_SDK
        MOBILE --> FIREBASE_SDK
        FIREBASE_SDK <--> FIRESTORE

        CHAT --> PUBSUB
        RAG --> PUBSUB
        WORKER --> PUBSUB

        PUBSUB --> EVENTARC
        EVENTARC --> FUNCTIONS
        FUNCTIONS --> FIRESTORE
        FUNCTIONS --> FCM
    end

技術選型對比:為什麼不用 WebSocket?

需求 ❌ 傳統 WebSocket ✅ Firebase 方案 優勢
即時連接 自建 WebSocket 伺服器 Firestore 即時監聽 零維護、自動擴展
斷線重連 自己寫重連邏輯 Firebase SDK 自動處理 穩定性更高
負載均衡 Sticky Session 配置 Firebase 自動分散 無狀態,更靈活
離線支援 需自建快取機制 內建離線快取 用戶體驗更好
跨平台 每個平台不同實作 統一 Firebase SDK 開發效率高
擴展性 需自己處理併發 Google 基礎設施 百萬級併發

2) Firestore 即時資料庫設計

資料結構設計

// Firestore 集合結構
collections:
  chats: {
    [chatId]: {
      user_id: string,
      status: 'pending' | 'processing' | 'completed' | 'error',
      current_step: string,
      progress: number,         // 0-100
      steps: {
        'receive_message': { status: 'completed', timestamp: '...' },
        'rag_search': { status: 'processing', progress: 30 },
        'llm_generation': { status: 'pending' },
        'response_ready': { status: 'pending' }
      },
      messages: [
        {
          role: 'user',
          content: string,
          timestamp: timestamp
        },
        {
          role: 'assistant',
          content: string,
          timestamp: timestamp,
          sources: [...],          // RAG 來源
          partial: boolean         // 是否為部分內容
        }
      ],
      metadata: {
        processing_time_ms: number,
        rag_results_count: number,
        error_message: string
      },
      created_at: timestamp,
      updated_at: timestamp
    }
  },

  user_sessions: {
    [userId]: {
      active_chats: [chatId, ...],
      preferences: {...},
      last_seen: timestamp
    }
  }

Firestore 安全規則

// firestore.rules
rules_version = '2';
service cloud.firestore {
  match /databases/{database}/documents {
    // 聊天文件權限
    match /chats/{chatId} {
      allow read, write: if request.auth != null &&
        request.auth.uid == resource.data.user_id;
    }

    // 用戶會話權限
    match /user_sessions/{userId} {
      allow read, write: if request.auth != null &&
        request.auth.uid == userId;
    }

    // 防止惡意寫入
    match /{document=**} {
      allow read, write: if false;
    }
  }
}

Firestore 初始化腳本

#!/bin/bash
# scripts/setup-firestore.sh

PROJECT_ID="your-project-id"

echo "🔥 設定 Firebase 和 Firestore..."

# 1. 啟用 Firebase API
gcloud services enable firebase.googleapis.com
gcloud services enable firestore.googleapis.com

# 2. 建立 Firebase 專案(如果還沒有)
firebase projects:addfirebase $PROJECT_ID

# 3. 初始化 Firestore
gcloud firestore databases create --region=asia-east1

# 4. 部署安全規則
firebase deploy --only firestore:rules

# 5. 建立複合索引
gcloud firestore indexes composite create \
    --collection-group=chats \
    --field-config=field-path=user_id,order=ASCENDING \
    --field-config=field-path=updated_at,order=DESCENDING

echo "✅ Firestore 設定完成"

3) 後端服務整合:推播進度更新

Firebase Admin SDK 初始化

# shared/firebase_client.py
import firebase_admin
from firebase_admin import credentials, firestore, messaging
from typing import Dict, Any, Optional, List
import logging
import json
from datetime import datetime

logger = logging.getLogger(__name__)

class FirebaseClient:
    """Firebase 客戶端統一管理"""

    def __init__(self, project_id: str):
        self.project_id = project_id

        # 初始化 Firebase Admin
        if not firebase_admin._apps:
            cred = credentials.ApplicationDefault()
            firebase_admin.initialize_app(cred, {
                'projectId': project_id
            })

        # 初始化 Firestore 客戶端
        self.db = firestore.client()

    async def create_chat_session(self, chat_id: str, user_id: str, initial_message: str) -> bool:
        """建立新的聊天會話"""
        try:
            chat_ref = self.db.collection('chats').document(chat_id)

            chat_data = {
                'user_id': user_id,
                'status': 'pending',
                'current_step': 'receive_message',
                'progress': 0,
                'steps': {
                    'receive_message': {
                        'status': 'completed',
                        'timestamp': datetime.now()
                    },
                    'rag_search': {
                        'status': 'pending'
                    },
                    'llm_generation': {
                        'status': 'pending'
                    },
                    'response_ready': {
                        'status': 'pending'
                    }
                },
                'messages': [
                    {
                        'role': 'user',
                        'content': initial_message,
                        'timestamp': datetime.now()
                    }
                ],
                'metadata': {},
                'created_at': datetime.now(),
                'updated_at': datetime.now()
            }

            chat_ref.set(chat_data)

            # 更新用戶會話
            await self._update_user_session(user_id, chat_id)

            return True

        except Exception as e:
            logger.error(f"建立聊天會話失敗: {e}")
            return False

    async def update_chat_progress(
        self,
        chat_id: str,
        step: str,
        progress: int,
        status: str = 'processing',
        message: str = None,
        metadata: Dict[str, Any] = None
    ):
        """更新聊天進度"""
        try:
            chat_ref = self.db.collection('chats').document(chat_id)

            # 準備更新資料
            update_data = {
                'current_step': step,
                'progress': progress,
                'status': status,
                f'steps.{step}.status': status,
                f'steps.{step}.timestamp': datetime.now(),
                'updated_at': datetime.now()
            }

            if progress:
                update_data[f'steps.{step}.progress'] = progress

            if message:
                update_data[f'steps.{step}.message'] = message

            if metadata:
                for key, value in metadata.items():
                    update_data[f'metadata.{key}'] = value

            # 原子更新
            chat_ref.update(update_data)

            logger.info(f"聊天 {chat_id} 進度更新: {step} - {progress}%")

        except Exception as e:
            logger.error(f"更新聊天進度失敗: {e}")

    async def add_message(
        self,
        chat_id: str,
        role: str,
        content: str,
        partial: bool = False,
        sources: List[Dict] = None
    ):
        """添加訊息到聊天"""
        try:
            chat_ref = self.db.collection('chats').document(chat_id)

            new_message = {
                'role': role,
                'content': content,
                'timestamp': datetime.now(),
                'partial': partial
            }

            if sources:
                new_message['sources'] = sources

            # 使用 arrayUnion 添加訊息
            chat_ref.update({
                'messages': firestore.ArrayUnion([new_message]),
                'updated_at': datetime.now()
            })

        except Exception as e:
            logger.error(f"添加訊息失敗: {e}")

    async def complete_chat(
        self,
        chat_id: str,
        final_response: str,
        processing_time_ms: int,
        sources: List[Dict] = None
    ):
        """完成聊天會話"""
        try:
            chat_ref = self.db.collection('chats').document(chat_id)

            # 添加最終回應
            if final_response:
                await self.add_message(
                    chat_id,
                    'assistant',
                    final_response,
                    partial=False,
                    sources=sources
                )

            # 更新狀態為完成
            chat_ref.update({
                'status': 'completed',
                'current_step': 'response_ready',
                'progress': 100,
                'steps.response_ready.status': 'completed',
                'steps.response_ready.timestamp': datetime.now(),
                'metadata.processing_time_ms': processing_time_ms,
                'updated_at': datetime.now()
            })

        except Exception as e:
            logger.error(f"完成聊天失敗: {e}")

    async def error_chat(self, chat_id: str, error_message: str):
        """標記聊天為錯誤狀態"""
        try:
            chat_ref = self.db.collection('chats').document(chat_id)

            chat_ref.update({
                'status': 'error',
                'metadata.error_message': error_message,
                'updated_at': datetime.now()
            })

        except Exception as e:
            logger.error(f"設定聊天錯誤狀態失敗: {e}")

    async def _update_user_session(self, user_id: str, chat_id: str):
        """更新用戶會話資訊"""
        try:
            user_ref = self.db.collection('user_sessions').document(user_id)

            user_ref.update({
                'active_chats': firestore.ArrayUnion([chat_id]),
                'last_seen': datetime.now()
            })

        except Exception as e:
            # 如果文件不存在,建立新的
            try:
                user_ref.set({
                    'active_chats': [chat_id],
                    'last_seen': datetime.now(),
                    'preferences': {}
                })
            except Exception as e2:
                logger.error(f"更新用戶會話失敗: {e2}")

# 全局 Firebase 客戶端實例
firebase_client: Optional[FirebaseClient] = None

def get_firebase_client(project_id: str = None) -> FirebaseClient:
    """獲取 Firebase 客戶端實例"""
    global firebase_client

    if firebase_client is None:
        if not project_id:
            import os
            project_id = os.getenv('GCP_PROJECT_ID')

        firebase_client = FirebaseClient(project_id)

    return firebase_client

更新 chat-service 整合 Firebase

# services/chat/app/handlers.py (Firebase 整合版)
import asyncio
import time
from typing import Dict, Any
from shared.firebase_client import get_firebase_client
from .models import ChatRequest, ChatResponse

class ChatHandler:
    def __init__(self):
        # ... 原有初始化 ...
        self.firebase = get_firebase_client()

    async def process_chat(self, request: ChatRequest) -> ChatResponse:
        """主要對話處理邏輯(Firebase 整合版)"""
        start_time = time.time()
        chat_id = request.chat_id or str(uuid.uuid4())

        try:
            # 1. 建立 Firebase 聊天會話
            await self.firebase.create_chat_session(
                chat_id,
                request.user_id,
                request.message
            )

            # 2. 載入用戶上下文
            await self.firebase.update_chat_progress(
                chat_id,
                'load_context',
                10,
                'processing',
                '正在載入對話記憶...'
            )

            context = await self._load_user_context(request.user_id, chat_id)

            # 3. 判斷處理模式
            processing_mode = self._determine_processing_mode(request.message, context)

            if processing_mode == ProcessingMode.SYNC:
                return await self._handle_sync_firebase(request, chat_id, context, start_time)
            else:
                return await self._handle_async_firebase(request, chat_id, context, start_time)

        except Exception as e:
            await self.firebase.error_chat(chat_id, str(e))
            raise

    async def _handle_sync_firebase(
        self,
        request: ChatRequest,
        chat_id: str,
        context: Dict,
        start_time: float
    ) -> ChatResponse:
        """同步處理(Firebase 版)"""
        try:
            # 更新進度:開始 LLM 生成
            await self.firebase.update_chat_progress(
                chat_id,
                'llm_generation',
                50,
                'processing',
                '正在生成回應...'
            )

            # 建立 prompt 並呼叫 LLM
            prompt = self._build_simple_prompt(request.message, context)
            response = await self._call_gemini(prompt, max_tokens=150)

            # 儲存對話
            await self._save_conversation(chat_id, request.user_id, request.message, response)

            # 完成聊天
            processing_time = int((time.time() - start_time) * 1000)
            await self.firebase.complete_chat(
                chat_id,
                response,
                processing_time
            )

            return ChatResponse(
                message=response,
                chat_id=chat_id,
                processing_mode=ProcessingMode.SYNC,
                is_complete=True,
                requires_followup=False
            )

        except Exception as e:
            await self.firebase.error_chat(chat_id, str(e))
            raise

    async def _handle_async_firebase(
        self,
        request: ChatRequest,
        chat_id: str,
        context: Dict,
        start_time: float
    ) -> ChatResponse:
        """非同步處理(Firebase 版)"""
        try:
            # 立即回應用戶
            quick_response = "我收到您的問題,正在仔細處理中..."

            # 發送到 Pub/Sub 進行背景處理
            await self._publish_task_event_firebase(request, chat_id, context)

            # 更新進度:任務已排程
            await self.firebase.update_chat_progress(
                chat_id,
                'task_scheduled',
                20,
                'processing',
                '任務已排程,正在檢索相關資料...'
            )

            return ChatResponse(
                message=quick_response,
                chat_id=chat_id,
                processing_mode=ProcessingMode.ASYNC,
                is_complete=False,
                requires_followup=True,
                metadata={"estimated_time": "10-30秒"}
            )

        except Exception as e:
            await self.firebase.error_chat(chat_id, str(e))
            raise

    async def _publish_task_event_firebase(
        self,
        request: ChatRequest,
        chat_id: str,
        context: Dict
    ):
        """發送任務事件到 Pub/Sub(包含 Firebase 資訊)"""
        if not self.publisher:
            logger.info("開發環境:模擬發送任務事件")
            return

        try:
            event = {
                "task_id": str(uuid.uuid4()),
                "chat_id": chat_id,
                "user_id": request.user_id,
                "message": request.message,
                "context": context,
                "firebase_enabled": True,  # 標記啟用 Firebase 更新
                "created_at": datetime.now().isoformat()
            }

            message_data = json.dumps(event).encode('utf-8')
            future = self.publisher.publish(self.topic_path, message_data)

            logger.info(f"任務事件已發送到 Pub/Sub: {future.result()}")

        except Exception as e:
            logger.error(f"發送任務事件失敗: {e}")
            await self.firebase.error_chat(chat_id, f"發送任務事件失敗: {e}")

4) Cloud Functions 事件處理

主要事件處理函數

# functions/chat_event_handler/main.py
import functions_framework
from google.cloud import pubsub_v1
import json
import asyncio
import logging
from shared.firebase_client import get_firebase_client
from services.rag.app.agent_builder_client import AgentBuilderClient
import google.generativeai as genai

logger = logging.getLogger(__name__)

# 全局初始化
firebase_client = get_firebase_client()
agent_builder = AgentBuilderClient(project_id="your-project-id")

@functions_framework.cloud_event
def handle_chat_task(cloud_event):
    """處理聊天任務事件"""
    try:
        # 解析 Pub/Sub 訊息
        message_data = cloud_event.data["message"]["data"]
        task_data = json.loads(base64.b64decode(message_data).decode())

        # 執行非同步處理
        asyncio.run(process_chat_task(task_data))

    except Exception as e:
        logger.error(f"處理聊天任務失敗: {e}")

async def process_chat_task(task_data: dict):
    """處理聊天任務的主要邏輯"""
    chat_id = task_data["chat_id"]
    user_id = task_data["user_id"]
    message = task_data["message"]

    try:
        # 步驟 1: RAG 檢索
        await firebase_client.update_chat_progress(
            chat_id,
            'rag_search',
            30,
            'processing',
            '正在檢索相關文件...'
        )

        rag_results = await perform_rag_search(message, user_id)

        # 步驟 2: LLM 生成
        await firebase_client.update_chat_progress(
            chat_id,
            'llm_generation',
            60,
            'processing',
            '正在生成詳細回答...'
        )

        final_response = await generate_rag_response(message, rag_results)

        # 步驟 3: 流式回應(可選)
        if len(final_response) > 500:
            await stream_response(chat_id, final_response)

        # 步驟 4: 完成處理
        await firebase_client.complete_chat(
            chat_id,
            final_response,
            processing_time_ms=0,  # Cloud Function 中難以精確計算
            sources=extract_sources(rag_results)
        )

    except Exception as e:
        await firebase_client.error_chat(chat_id, str(e))
        logger.error(f"處理任務失敗 {chat_id}: {e}")

async def perform_rag_search(query: str, user_id: str) -> dict:
    """執行 RAG 檢索"""
    try:
        # 使用 Agent Builder 進行檢索
        results = await agent_builder.search_documents(
            query=query,
            page_size=5
        )

        return results

    except Exception as e:
        logger.error(f"RAG 檢索失敗: {e}")
        return {"results": []}

async def generate_rag_response(query: str, rag_results: dict) -> str:
    """基於 RAG 結果生成回應"""
    try:
        # 建立增強的 prompt
        context_parts = []
        for result in rag_results.get("results", [])[:3]:
            context_parts.append(f"參考資料:{result.get('content', '')[:300]}")

        enhanced_prompt = f"""
基於以下參考資料回答問題:

{chr(10).join(context_parts)}

問題:{query}

請提供詳細、準確的回答,並在回答末尾註明參考來源。
        """.strip()

        # 呼叫 Gemini API
        genai.configure(api_key="your-gemini-api-key")
        model = genai.GenerativeModel('gemini-pro')

        response = model.generate_content(enhanced_prompt)
        return response.text

    except Exception as e:
        logger.error(f"生成回應失敗: {e}")
        return "抱歉,我在處理您的問題時遇到了一些困難。請稍後再試。"

async def stream_response(chat_id: str, full_response: str):
    """流式推送長回應"""
    try:
        # 將長回應分成多個部分
        chunks = [full_response[i:i+200] for i in range(0, len(full_response), 200)]

        for i, chunk in enumerate(chunks):
            progress = int((i + 1) / len(chunks) * 30) + 70  # 70-100%

            # 添加部分回應
            await firebase_client.add_message(
                chat_id,
                'assistant',
                chunk,
                partial=True
            )

            # 更新進度
            await firebase_client.update_chat_progress(
                chat_id,
                'streaming_response',
                progress,
                'processing',
                f'正在推送回應... ({i+1}/{len(chunks)})'
            )

            # 短暫延遲,模擬打字效果
            await asyncio.sleep(0.5)

    except Exception as e:
        logger.error(f"流式回應失敗: {e}")

def extract_sources(rag_results: dict) -> list:
    """提取 RAG 來源資訊"""
    sources = []

    for result in rag_results.get("results", [])[:3]:
        sources.append({
            "title": result.get("title", "未知來源"),
            "content_preview": result.get("content", "")[:100] + "...",
            "metadata": result.get("metadata", {})
        })

    return sources

Cloud Functions 部署配置

# functions/chat_event_handler/function.yaml
name: chat-event-handler
runtime: python310
source: .
entry_point: handle_chat_task

trigger:
  event_trigger:
    event_type: google.cloud.pubsub.topic.v1.messagePublished
    resource: projects/YOUR_PROJECT/topics/chat-tasks

environment_variables:
  GCP_PROJECT_ID: "your-project-id"
  GEMINI_API_KEY: "your-gemini-api-key"

resources:
  memory: 512MB
  cpu: 1
  timeout: 540s

service_account_email: "chat-functions-sa@your-project.iam.gserviceaccount.com"

部署 Cloud Functions

#!/bin/bash
# scripts/deploy-functions.sh

echo "☁️ 部署 Cloud Functions..."

# 1. 建立服務帳號
gcloud iam service-accounts create chat-functions-sa \
    --display-name="Chat Functions Service Account"

# 2. 設定權限
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:chat-functions-sa@$PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/firestore.user"

gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:chat-functions-sa@$PROJECT_ID.iam.gserviceaccount.com" \
    --role="roles/pubsub.subscriber"

# 3. 部署函數
gcloud functions deploy chat-event-handler \
    --runtime=python310 \
    --trigger-topic=chat-tasks \
    --source=functions/chat_event_handler \
    --entry-point=handle_chat_task \
    --service-account=chat-functions-sa@$PROJECT_ID.iam.gserviceaccount.com \
    --set-env-vars="GCP_PROJECT_ID=$PROJECT_ID" \
    --memory=512MB \
    --timeout=540s \
    --region=asia-east1

echo "✅ Cloud Functions 部署完成"

5) 前端實作:React + Firebase SDK

Firebase 配置

// frontend/src/firebase/config.js
import { initializeApp } from 'firebase/app';
import { getFirestore } from 'firebase/firestore';
import { getAuth } from 'firebase/auth';
import { getMessaging } from 'firebase/messaging';

const firebaseConfig = {
  apiKey: "your-api-key",
  authDomain: "your-project.firebaseapp.com",
  projectId: "your-project-id",
  storageBucket: "your-project.appspot.com",
  messagingSenderId: "123456789",
  appId: "your-app-id"
};

// 初始化 Firebase
const app = initializeApp(firebaseConfig);

// 初始化服務
export const db = getFirestore(app);
export const auth = getAuth(app);
export const messaging = getMessaging(app);

export default app;

即時聊天組件

// frontend/src/components/ChatInterface.jsx
import React, { useState, useEffect, useRef } from 'react';
import { doc, onSnapshot, collection, addDoc } from 'firebase/firestore';
import { db } from '../firebase/config';
import { useAuth } from '../hooks/useAuth';

const ChatInterface = () => {
  const [messages, setMessages] = useState([]);
  const [inputMessage, setInputMessage] = useState('');
  const [chatStatus, setChatStatus] = useState(null);
  const [currentProgress, setCurrentProgress] = useState(0);
  const [isLoading, setIsLoading] = useState(false);
  const { user } = useAuth();
  const messagesEndRef = useRef(null);
  const currentChatId = useRef(null);

  // 滾動到最新訊息
  const scrollToBottom = () => {
    messagesEndRef.current?.scrollIntoView({ behavior: "smooth" });
  };

  useEffect(() => {
    scrollToBottom();
  }, [messages]);

  // 監聽聊天狀態更新
  useEffect(() => {
    if (!currentChatId.current) return;

    const unsubscribe = onSnapshot(
      doc(db, 'chats', currentChatId.current),
      (doc) => {
        if (doc.exists()) {
          const data = doc.data();

          // 更新訊息
          if (data.messages) {
            setMessages(data.messages);
          }

          // 更新進度和狀態
          setChatStatus(data.status);
          setCurrentProgress(data.progress || 0);

          // 如果完成,停止載入狀態
          if (data.status === 'completed' || data.status === 'error') {
            setIsLoading(false);
          }
        }
      },
      (error) => {
        console.error('監聽聊天更新失敗:', error);
        setIsLoading(false);
      }
    );

    return () => unsubscribe();
  }, [currentChatId.current]);

  // 發送訊息
  const sendMessage = async () => {
    if (!inputMessage.trim() || isLoading || !user) return;

    setIsLoading(true);
    const messageText = inputMessage;
    setInputMessage('');

    try {
      // 生成新的聊天 ID
      const chatId = Date.now().toString();
      currentChatId.current = chatId;

      // 呼叫後端 API 開始處理
      const response = await fetch('/api/chat', {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          'Authorization': `Bearer ${await user.getIdToken()}`
        },
        body: JSON.stringify({
          message: messageText,
          user_id: user.uid,
          chat_id: chatId,
          processing_mode: 'async'
        })
      });

      if (!response.ok) {
        throw new Error('發送訊息失敗');
      }

      // 後端會處理 Firebase 更新,前端只需監聽

    } catch (error) {
      console.error('發送訊息失敗:', error);
      setIsLoading(false);
      alert('發送訊息失敗,請稍後重試');
    }
  };

  // 處理按下 Enter 鍵
  const handleKeyPress = (e) => {
    if (e.key === 'Enter' && !e.shiftKey) {
      e.preventDefault();
      sendMessage();
    }
  };

  return (
    <div className="chat-interface">
      {/* 聊天區域 */}
      <div className="chat-messages">
        {messages.map((message, index) => (
          <div key={index} className={`message ${message.role}`}>
            <div className="message-content">
              {message.content}
              {message.partial && <span className="typing-indicator">...</span>}
            </div>
            {message.sources && (
              <div className="message-sources">
                <strong>參考來源:</strong>
                {message.sources.map((source, idx) => (
                  <span key={idx} className="source-tag">
                    {source.title}
                  </span>
                ))}
              </div>
            )}
            <div className="message-time">
              {new Date(message.timestamp?.toDate()).toLocaleTimeString()}
            </div>
          </div>
        ))}

        {/* 載入指示器和進度條 */}
        {isLoading && (
          <div className="loading-indicator">
            <div className="progress-container">
              <div className="progress-bar">
                <div
                  className="progress-fill"
                  style={{ width: `${currentProgress}%` }}
                ></div>
              </div>
              <div className="progress-text">
                {getProgressMessage(chatStatus, currentProgress)}
              </div>
            </div>
          </div>
        )}

        <div ref={messagesEndRef} />
      </div>

      {/* 輸入區域 */}
      <div className="chat-input">
        <textarea
          value={inputMessage}
          onChange={(e) => setInputMessage(e.target.value)}
          onKeyPress={handleKeyPress}
          placeholder="輸入您的問題..."
          disabled={isLoading}
          rows="3"
        />
        <button
          onClick={sendMessage}
          disabled={isLoading || !inputMessage.trim()}
          className="send-button"
        >
          {isLoading ? '處理中...' : '發送'}
        </button>
      </div>
    </div>
  );
};

// 根據狀態獲取進度訊息
const getProgressMessage = (status, progress) => {
  if (progress < 20) return '正在接收您的問題...';
  if (progress < 40) return '正在檢索相關文件...';
  if (progress < 70) return '正在分析和生成回答...';
  if (progress < 90) return '正在完善回答內容...';
  return '即將完成...';
};

export default ChatInterface;

CSS 樣式

/* frontend/src/components/ChatInterface.css */
.chat-interface {
  display: flex;
  flex-direction: column;
  height: 100vh;
  max-width: 800px;
  margin: 0 auto;
  border: 1px solid #e0e0e0;
  border-radius: 8px;
  overflow: hidden;
}

.chat-messages {
  flex: 1;
  overflow-y: auto;
  padding: 20px;
  background-color: #f8f9fa;
}

.message {
  margin-bottom: 15px;
  animation: fadeIn 0.3s ease-in;
}

.message.user {
  text-align: right;
}

.message.assistant {
  text-align: left;
}

.message-content {
  display: inline-block;
  max-width: 70%;
  padding: 12px 16px;
  border-radius: 18px;
  word-wrap: break-word;
}

.message.user .message-content {
  background-color: #007bff;
  color: white;
}

.message.assistant .message-content {
  background-color: white;
  border: 1px solid #e0e0e0;
  color: #333;
}

.typing-indicator {
  animation: pulse 1.5s infinite;
  color: #666;
}

.message-sources {
  margin-top: 8px;
  font-size: 12px;
  color: #666;
}

.source-tag {
  background-color: #e3f2fd;
  padding: 2px 6px;
  border-radius: 10px;
  margin-right: 5px;
  font-size: 10px;
}

.message-time {
  font-size: 11px;
  color: #999;
  margin-top: 5px;
}

.loading-indicator {
  text-align: center;
  padding: 20px;
  background-color: white;
  border-radius: 10px;
  margin: 10px 0;
  border: 1px solid #e0e0e0;
}

.progress-container {
  margin: 10px 0;
}

.progress-bar {
  width: 100%;
  height: 6px;
  background-color: #e0e0e0;
  border-radius: 3px;
  overflow: hidden;
}

.progress-fill {
  height: 100%;
  background: linear-gradient(90deg, #007bff, #0056b3);
  border-radius: 3px;
  transition: width 0.3s ease;
  animation: shimmer 2s infinite;
}

.progress-text {
  margin-top: 10px;
  color: #666;
  font-size: 14px;
}

.chat-input {
  display: flex;
  padding: 20px;
  border-top: 1px solid #e0e0e0;
  background-color: white;
}

.chat-input textarea {
  flex: 1;
  border: 1px solid #ddd;
  border-radius: 20px;
  padding: 12px 16px;
  font-size: 14px;
  resize: none;
  outline: none;
}

.chat-input textarea:focus {
  border-color: #007bff;
}

.send-button {
  margin-left: 10px;
  background-color: #007bff;
  color: white;
  border: none;
  border-radius: 20px;
  padding: 12px 24px;
  cursor: pointer;
  font-size: 14px;
  transition: background-color 0.2s;
}

.send-button:hover:not(:disabled) {
  background-color: #0056b3;
}

.send-button:disabled {
  background-color: #ccc;
  cursor: not-allowed;
}

/* 動畫效果 */
@keyframes fadeIn {
  from { opacity: 0; transform: translateY(10px); }
  to { opacity: 1; transform: translateY(0); }
}

@keyframes pulse {
  0%, 100% { opacity: 1; }
  50% { opacity: 0.5; }
}

@keyframes shimmer {
  0% { background-position: -200px 0; }
  100% { background-position: 200px 0; }
}

.progress-fill {
  background: linear-gradient(
    90deg,
    #007bff 25%,
    #66b3ff 50%,
    #007bff 75%
  );
  background-size: 200px 100%;
  animation: shimmer 2s infinite;
}

6) Firebase Cloud Messaging:離線推播

FCM 設定和 Service Worker

// frontend/public/firebase-messaging-sw.js
import { initializeApp } from 'firebase/app';
import { getMessaging, onBackgroundMessage } from 'firebase/messaging/sw';

const firebaseConfig = {
  // 你的 Firebase 配置
};

const app = initializeApp(firebaseConfig);
const messaging = getMessaging(app);

// 處理背景訊息
onBackgroundMessage(messaging, (payload) => {
  console.log('收到背景推播:', payload);

  const notificationTitle = payload.notification.title;
  const notificationOptions = {
    body: payload.notification.body,
    icon: '/icon-192x192.png',
    badge: '/badge-72x72.png',
    data: payload.data,
    actions: [
      {
        action: 'open_chat',
        title: '查看對話'
      }
    ]
  };

  self.registration.showNotification(notificationTitle, notificationOptions);
});

// 處理推播點擊
self.addEventListener('notificationclick', (event) => {
  event.notification.close();

  if (event.action === 'open_chat') {
    const chatId = event.notification.data.chat_id;
    const url = `${self.location.origin}/chat/${chatId}`;

    event.waitUntil(
      clients.openWindow(url)
    );
  }
});

FCM 權限請求和 Token 管理

// frontend/src/hooks/useNotifications.js
import { useState, useEffect } from 'react';
import { getMessaging, getToken, onMessage } from 'firebase/messaging';
import { messaging } from '../firebase/config';

export const useNotifications = (user) => {
  const [notificationPermission, setNotificationPermission] = useState(
    Notification.permission
  );
  const [fcmToken, setFcmToken] = useState(null);

  useEffect(() => {
    if (!user) return;

    requestNotificationPermission();
    setupMessageListener();
  }, [user]);

  const requestNotificationPermission = async () => {
    try {
      const permission = await Notification.requestPermission();
      setNotificationPermission(permission);

      if (permission === 'granted') {
        const token = await getToken(messaging, {
          vapidKey: 'your-vapid-key'
        });

        setFcmToken(token);

        // 將 Token 發送到後端儲存
        await saveFcmToken(user.uid, token);
      }
    } catch (error) {
      console.error('獲取推播權限失敗:', error);
    }
  };

  const setupMessageListener = () => {
    onMessage(messaging, (payload) => {
      console.log('收到前景推播:', payload);

      // 顯示瀏覽器通知
      if (notificationPermission === 'granted') {
        new Notification(payload.notification.title, {
          body: payload.notification.body,
          icon: payload.notification.icon,
          data: payload.data
        });
      }
    });
  };

  const saveFcmToken = async (userId, token) => {
    try {
      await fetch('/api/user/fcm-token', {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          'Authorization': `Bearer ${await user.getIdToken()}`
        },
        body: JSON.stringify({ userId, fcmToken: token })
      });
    } catch (error) {
      console.error('儲存 FCM Token 失敗:', error);
    }
  };

  return {
    notificationPermission,
    fcmToken,
    requestNotificationPermission
  };
};

後端 FCM 推播服務

# shared/fcm_service.py
from firebase_admin import messaging
import logging

logger = logging.getLogger(__name__)

class FCMService:
    """Firebase Cloud Messaging 服務"""

    def __init__(self):
        pass

    async def send_notification(
        self,
        fcm_token: str,
        title: str,
        body: str,
        data: dict = None
    ) -> bool:
        """發送推播通知"""
        try:
            message = messaging.Message(
                notification=messaging.Notification(
                    title=title,
                    body=body
                ),
                data=data or {},
                token=fcm_token
            )

            response = messaging.send(message)
            logger.info(f"推播通知已發送: {response}")
            return True

        except Exception as e:
            logger.error(f"發送推播通知失敗: {e}")
            return False

    async def send_chat_completion_notification(
        self,
        fcm_token: str,
        chat_id: str,
        preview: str
    ):
        """發送聊天完成通知"""
        await self.send_notification(
            fcm_token,
            "AI 助手回覆完成",
            preview[:50] + "..." if len(preview) > 50 else preview,
            {
                "type": "chat_completion",
                "chat_id": chat_id
            }
        )

    async def send_processing_notification(
        self,
        fcm_token: str,
        chat_id: str,
        progress: int
    ):
        """發送處理進度通知"""
        if progress == 50:  # 只在50%時發送一次,避免過多通知
            await self.send_notification(
                fcm_token,
                "正在處理您的問題",
                f"進度: {progress}%,預計還需要 10-15 秒",
                {
                    "type": "processing_update",
                    "chat_id": chat_id,
                    "progress": str(progress)
                }
            )

7) 部署配置:完整的 Docker Compose

本地開發環境

# docker-compose.dev.yml
version: '3.8'

services:
  # Chat Service
  chat-service:
    build:
      context: .
      dockerfile: services/chat/Dockerfile
    ports:
      - "8080:8080"
    environment:
      - ENVIRONMENT=development
      - GCP_PROJECT_ID=your-project-id
      - FIREBASE_ENABLED=true
      - MEMORY_SERVICE_URL=http://memory-service:8081
      - RAG_SERVICE_URL=http://rag-service:8082
      - GEMINI_API_KEY=${GEMINI_API_KEY}
    depends_on:
      - memory-service
      - rag-service

  # Memory Service
  memory-service:
    build:
      context: .
      dockerfile: services/memory/Dockerfile
    ports:
      - "8081:8080"
    environment:
      - DATABASE_URL=postgresql://postgres:password@postgres:5432/ai_assistant
    depends_on:
      - postgres

  # RAG Service (Agent Builder)
  rag-service:
    build:
      context: .
      dockerfile: services/rag/Dockerfile
    ports:
      - "8082:8080"
    environment:
      - GCP_PROJECT_ID=your-project-id
      - DOCUMENTS_BUCKET=${PROJECT_ID}-documents

  # PostgreSQL Database
  postgres:
    image: postgres:15
    environment:
      - POSTGRES_DB=ai_assistant
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=password
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

  # Frontend Development Server
  frontend:
    build:
      context: ./frontend
      dockerfile: Dockerfile.dev
    ports:
      - "3000:3000"
    environment:
      - REACT_APP_API_BASE_URL=http://localhost:8080
      - REACT_APP_FIREBASE_API_KEY=${FIREBASE_API_KEY}
      - REACT_APP_FIREBASE_PROJECT_ID=your-project-id
    volumes:
      - ./frontend/src:/app/src
      - ./frontend/public:/app/public

volumes:
  postgres_data:

生產環境部署腳本

#!/bin/bash
# scripts/deploy-with-firebase.sh

PROJECT_ID="your-project-id"
REGION="asia-east1"

echo "🚀 部署完整的即時推播系統..."

# 1. 部署後端服務
echo "📦 部署後端服務..."
./scripts/deploy-backend-services.sh

# 2. 部署 Cloud Functions
echo "☁️ 部署 Cloud Functions..."
./scripts/deploy-functions.sh

# 3. 設定 Firestore 和 Firebase
echo "🔥 設定 Firebase..."
./scripts/setup-firestore.sh

# 4. 部署前端到 Firebase Hosting
echo "🌐 部署前端..."
cd frontend
npm run build
firebase deploy --only hosting
cd ..

# 5. 設定 FCM 和推播
echo "📱 設定推播通知..."
firebase deploy --only messaging

# 6. 測試完整流程
echo "🧪 測試即時推播..."
./scripts/test-realtime-flow.sh

echo "✅ 即時推播系統部署完成!"
echo ""
echo "🔗 應用網址:"
echo "  前端: https://your-project-id.web.app"
echo "  API: https://chat-service-xxx.run.app"
echo ""
echo "📊 監控儀表板:"
echo "  Firebase 控制台: https://console.firebase.google.com/project/your-project-id"
echo "  Cloud Functions 日誌: https://console.cloud.google.com/functions"

整合測試腳本

#!/bin/bash
# scripts/test-realtime-flow.sh

echo "🧪 測試即時推播流程..."

API_BASE="https://chat-service-xxx.run.app"
TEST_USER_ID="test-user-123"
TEST_MESSAGE="請分析人工智慧的發展趨勢"

# 1. 測試建立聊天會話
echo "1. 測試建立聊天會話..."
CHAT_RESPONSE=$(curl -s -X POST "$API_BASE/chat" \
  -H "Content-Type: application/json" \
  -d '{
    "message": "'$TEST_MESSAGE'",
    "user_id": "'$TEST_USER_ID'",
    "processing_mode": "async"
  }')

CHAT_ID=$(echo $CHAT_RESPONSE | jq -r '.chat_id')
echo "聊天 ID: $CHAT_ID"

# 2. 監控 Firestore 更新(模擬)
echo "2. 模擬監控 Firestore 更新..."
for i in {1..30}; do
  echo "檢查進度... ($i/30)"

  # 實際場景中,這裡會檢查 Firestore
  # 為了測試,我們模擬等待
  sleep 2

  if [ $i -eq 15 ]; then
    echo "✅ 檢測到進度更新: RAG 檢索完成"
  fi

  if [ $i -eq 25 ]; then
    echo "✅ 檢測到進度更新: LLM 生成完成"
  fi
done

echo "✅ 即時推播流程測試完成"

8) 監控與效能優化

Firestore 使用統計監控

# shared/firestore_monitor.py
from google.cloud import firestore
from google.cloud import monitoring_v3
import time
import logging

logger = logging.getLogger(__name__)

class FirestoreMonitor:
    """Firestore 使用監控"""

    def __init__(self, project_id: str):
        self.project_id = project_id
        self.db = firestore.Client()
        self.monitoring_client = monitoring_v3.MetricServiceClient()
        self.project_name = f"projects/{project_id}"

    async def record_chat_metrics(
        self,
        chat_id: str,
        user_id: str,
        processing_time_ms: int,
        message_count: int
    ):
        """記錄聊天指標"""
        try:
            # 記錄到 Firestore 統計集合
            stats_ref = self.db.collection('chat_stats').document(chat_id)
            stats_ref.set({
                'user_id': user_id,
                'processing_time_ms': processing_time_ms,
                'message_count': message_count,
                'timestamp': firestore.SERVER_TIMESTAMP
            })

            # 記錄到 Cloud Monitoring
            await self._record_processing_time(processing_time_ms)
            await self._record_message_count(message_count)

        except Exception as e:
            logger.error(f"記錄聊天指標失敗: {e}")

    async def _record_processing_time(self, processing_time_ms: int):
        """記錄處理時間指標"""
        try:
            series = monitoring_v3.TimeSeries()
            series.metric.type = "custom.googleapis.com/chat/processing_time"
            series.resource.type = "global"

            now = time.time()
            seconds = int(now)
            nanos = int((now - seconds) * 10 ** 9)

            interval = monitoring_v3.TimeInterval({
                "end_time": {"seconds": seconds, "nanos": nanos}
            })

            point = monitoring_v3.Point({
                "interval": interval,
                "value": {"int64_value": processing_time_ms}
            })

            series.points = [point]
            self.monitoring_client.create_time_series(
                name=self.project_name,
                time_series=[series]
            )

        except Exception as e:
            logger.error(f"記錄處理時間指標失敗: {e}")

    async def get_daily_stats(self, date: str = None) -> dict:
        """獲取每日統計"""
        try:
            if not date:
                from datetime import date as dt
                date = dt.today().isoformat()

            # 查詢當日統計
            stats_query = self.db.collection('chat_stats') \
                .where('timestamp', '>=', date) \
                .where('timestamp', '<', date + 'T23:59:59')

            docs = stats_query.stream()

            total_chats = 0
            total_processing_time = 0
            total_messages = 0

            for doc in docs:
                data = doc.to_dict()
                total_chats += 1
                total_processing_time += data.get('processing_time_ms', 0)
                total_messages += data.get('message_count', 0)

            return {
                'date': date,
                'total_chats': total_chats,
                'average_processing_time_ms': total_processing_time / total_chats if total_chats > 0 else 0,
                'total_messages': total_messages,
                'average_messages_per_chat': total_messages / total_chats if total_chats > 0 else 0
            }

        except Exception as e:
            logger.error(f"獲取每日統計失敗: {e}")
            return {}

效能優化建議

# shared/performance_optimizer.py
import asyncio
from typing import List, Dict, Any
import logging

logger = logging.getLogger(__name__)

class FirebaseOptimizer:
    """Firebase 效能優化器"""

    @staticmethod
    def batch_firestore_writes(updates: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """批次寫入優化"""
        # 將多個更新合併為批次寫入
        # 最多 500 個操作一批
        batches = []
        current_batch = []

        for update in updates:
            current_batch.append(update)
            if len(current_batch) >= 500:
                batches.append(current_batch)
                current_batch = []

        if current_batch:
            batches.append(current_batch)

        return batches

    @staticmethod
    def optimize_listener_queries(user_id: str) -> Dict[str, Any]:
        """優化 Firestore 監聽查詢"""
        # 只監聽用戶相關的文件,減少不必要的讀取
        return {
            'collection': 'chats',
            'where': [('user_id', '==', user_id)],
            'orderBy': ('updated_at', 'desc'),
            'limit': 10  # 只監聽最近 10 個聊天
        }

    @staticmethod
    async def cleanup_old_chats(firebase_client, days: int = 30):
        """清理舊聊天記錄"""
        try:
            from datetime import datetime, timedelta

            cutoff_date = datetime.now() - timedelta(days=days)

            # 查詢舊聊天
            old_chats_query = firebase_client.db.collection('chats') \
                .where('updated_at', '<', cutoff_date) \
                .limit(100)  # 批次處理

            docs = old_chats_query.stream()

            deleted_count = 0
            for doc in docs:
                doc.reference.delete()
                deleted_count += 1

            logger.info(f"清理了 {deleted_count} 個舊聊天記錄")

        except Exception as e:
            logger.error(f"清理舊聊天記錄失敗: {e}")



上一篇
RAG 檢索服務實作篇
下一篇
Vertex AI Agent Builder 深度整合篇:打造會使用工具的 AI 助手
系列文
來都來了,那就做一個GCP從0到100的AI助理29
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言